#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <event.h>
#include <err.h>
#include <errno.h>

#include <list>
using std::list;

#include <string>

#include "../proto/rawdata.pb.h"

#define SERVER_PORT 7005
#define AGG_SERVER_PORT 7004
#define MAX_DATA_LEN 512

//创建无名管道用于父子进程通信 
int parent_write;
int child_read;

int debug = 0;
int agg_client_nm = 0;
int dis_client_nm = 0;

struct client
{
    int fd;
    struct bufferevent * buf_ev;
};

list<client*> dis_clients; // list保存链接上的数据采集端 

int setnonblock(int fd) {
    int flags;

    flags = fcntl(fd, F_GETFL);

    flags |= O_NONBLOCK;

    fcntl(fd, F_SETFL, flags);

}

void child_pip_read_callback(struct bufferevent * bev,  void *arg){
    fprintf(stderr, "dis client NUM: %ld\n", dis_clients.size());
    fprintf(stderr, "child pip read call back\n");
    evbuffer * input = bev->input;


    uint32_t datalen = 0;
    char databuff[512];
    memset(databuff, 0, 512);
    evbuffer_remove(input, &datalen, sizeof(uint32_t));
    evbuffer_remove(input, databuff, datalen);


    if(dis_clients.size() <= 0){

        senslayer::Rawdata rawdata;
        rawdata.ParseFromString(std::string(databuff, datalen));
        fprintf(stderr,"%ld %d %ld %lf \n", rawdata.timestamp(), 
                rawdata.txid(), rawdata.rxid(), rawdata.rss());
    }else{
        list<client *>::iterator it;
        for(it = dis_clients.begin(); it != dis_clients.end(); ++it){

            char write_buff[512];
            memset(write_buff, 0, 512);
            memcpy(write_buff, &datalen, sizeof(uint32_t));
            memcpy(write_buff+sizeof(uint32_t), databuff, datalen);

            bufferevent_write((*it)->buf_ev, write_buff, sizeof(uint32_t)+datalen);


        }
    }

}

//如果每个客户端的写入端能够写入数据,则通过去读pipe_in 的数据写入
//每个数据采集客户端----但愿数据不会乱
void dis_client_write_callback(struct bufferevent * bev,  void * arg) {
    //fprintf(stderr, "child write callback\n");
    struct client * dis_client = (struct client *) arg;

    uint32_t datalen = 0;
    char databuff[512];
    memset(databuff, 0, 512);
    read(child_read, &datalen, sizeof(uint32_t));
    read(child_read, databuff, datalen);


    senslayer::Rawdata rawdata;
    rawdata.ParseFromString(std::string(databuff, datalen));
    fprintf(stderr,"return for client %ld %d %ld %lf \n", rawdata.timestamp(), 
            rawdata.txid(), rawdata.rxid(), rawdata.rss());



    //保证顺序发送
    //FIXME: 未处理写入的异常问题, 例如写入不全,或者客户端断连

    char write_buff[512];
    memset(write_buff, 0, 512);
    memcpy(write_buff, &datalen, sizeof(uint32_t));
    memcpy(write_buff+sizeof(uint32_t), databuff, datalen);

    bufferevent_write(dis_client->buf_ev, write_buff, sizeof(uint32_t)+datalen);


    return ;
}


//针对树莓派客户端的发送数据进行管道输送, 转发给子进程
void agg_client_read_callback(struct bufferevent *incoming, void * argc) {

    fprintf(stderr, "agg client read callback\n");
    evbuffer * input = incoming->input;
    evbuffer * output = incoming->output;


    uint32_t datalen = 0;
    char databuff[512];
    memset(databuff, 0, 512);
    evbuffer_remove(input, &datalen, sizeof(uint32_t));
    evbuffer_remove(input, databuff, datalen);

    /*
       senslayer::Rawdata rawdata;
       rawdata.ParseFromString(std::string(databuff, datalen));
       fprintf(stderr,"%ld %d %ld %lf \n", rawdata.timestamp(), 
       rawdata.txid(), rawdata.rxid(), rawdata.rss());
       */

    //FIXME: 需要保证发送的顺序和准确性,全部发送或者不发,防止多个socket同时对一个流
    //进行读写时,发生混乱,未能处理异常问题
    //保证顺序发送
    char write_buff[512];
    memset(write_buff, 0, 512);
    memcpy(write_buff, &datalen, sizeof(uint32_t));
    memcpy(write_buff+sizeof(uint32_t), databuff, datalen);

    write(parent_write, write_buff, datalen+sizeof(uint32_t));
    return;
}

void dis_client_error_callback(struct bufferevent * bev, short what, void * arg) {
    fprintf(stderr, "dispatch client error callback exec\n");
    if(what & BEV_EVENT_EOF)
        fprintf(stderr, "dispatch clinet has been closed\n");

    if(what & BEV_EVENT_ERROR)
        fprintf(stderr,"dispatch client transfer error\n");

    struct client * dis_client  = (struct client *) arg;
    dis_clients.remove(dis_client);
    bufferevent_free(dis_client->buf_ev);
    close(dis_client->fd);
    free(dis_client);
    return;
}

void agg_client_error_callback(struct bufferevent * bev, short what, void * arg) {
    fprintf(stderr, "agg client callback exec\n");

    if(what & BEV_EVENT_EOF)
        fprintf(stderr, "agg client has been closed\n");

    if(what & BEV_EVENT_ERROR)
        fprintf(stderr,"agg client transfer error\n");

    struct client * client  = (struct client *) arg;
    bufferevent_free(client->buf_ev);
    close(client->fd);
    free(client);
    return;
}
//接受来自树莓派客户端的链接请求
void agg_server_accept_callback(int fd, short ev, void * arg) {

    int agg_client_fd;

    struct sockaddr_in agg_client_addr;
    socklen_t agg_client_len = sizeof(agg_client_addr);


    struct client * agg_client;


    agg_client_fd = accept(fd,
            (struct sockaddr *)&agg_client_addr,
            &agg_client_len);

    if (agg_client_fd < 0) {
        fprintf(stderr,"Client: accept() failed\n");
        return;
    }

    fprintf(stderr, "get a agg client\n");

    setnonblock(agg_client_fd);

    agg_client = (struct client *)calloc(1, sizeof(*agg_client));

    if (agg_client == NULL) 
        fprintf(stderr, "malloc failed\n");

    agg_client->fd = agg_client_fd;

    agg_client->buf_ev = bufferevent_new(
            agg_client_fd, 
            agg_client_read_callback,
            NULL,
            agg_client_error_callback,
            agg_client);

    bufferevent_enable(agg_client->buf_ev, EV_READ);

}

//接受来自数据采集客户端的请求
void dis_server_accept_callback(int fd, short ev, void * arg){
    fprintf(stderr, "child accept call back\n");


    struct client *dis_client;
    dis_client = (struct client*)calloc(0, sizeof(struct client));

    if(dis_client == NULL)
        fprintf(stderr, "calloc dis_client failed\n");

    struct sockaddr_in dis_client_addr;
    socklen_t client_len = sizeof(dis_client_addr);


    int dis_client_fd = accept(fd,
            (struct sockaddr *)&dis_client_addr,
            &client_len);

    if (dis_client_fd < 0) {
        fprintf(stderr,"child Client: accept() failed\n");
        return;
    }


    fprintf(stderr, "get a dispatch client\n");

    setnonblock(dis_client_fd);

    dis_client->fd = dis_client_fd;
    dis_client->buf_ev= 
        (struct bufferevent *)calloc(0, sizeof(struct bufferevent));

    dis_clients.push_back(dis_client);
    //pipe读取流关联上每一个数据采集客户端,并使用write_call_back,写入每一个连接
    //到7004的客户端,用于客户端的读取解析,其中数据并非多次拷贝,每个客户端只能
    //获得最新的数据,不能保证所有客户端的数据是同样的
    dis_client->buf_ev = bufferevent_new(
            dis_client_fd,
            NULL, 
            NULL,
            dis_client_error_callback, 
            dis_client);

    bufferevent_enable(dis_client->buf_ev, EV_READ | EV_WRITE);

    return;
}

int main(int argc, char const *argv[])
{
    if(LIBEVENT_VERSION_NUMBER < 2)
        fprintf(stderr, "Support Only By Libevent Version >= 2\n");

    int pipe_fd[2];
    pid_t pid;
    if(pipe(pipe_fd) < 0){
        fprintf(stderr, "pipe create failed\n");
        exit(-1);
    }
    child_read = pipe_fd[0];
    parent_write = pipe_fd[1];

    //子进程用于数据分发
    if((pid = fork()) == 0){


        //子进程用于手机所有数据信息并发送到7004端口,通过pipe与父进程通信
        fprintf(stderr, "data dispatch is ok! port: 7004 child proc %d\n", pid);
        close(pipe_fd[1]);

        //为子进程建立一个event 工作流程
        event_init();
        struct event dis_server_accept_event;
        struct event child_pipe_read_event;

        //为管道读数据端建立一个事件，用于不断更新数据，
        //保证传感器的数据实时，不发生缓冲堵塞

        int dislistenfd ;
        struct sockaddr_in dislistenaddr;


        dislistenfd = socket(AF_INET, SOCK_STREAM, 0);

        memset(&dislistenaddr, 0, sizeof(struct sockaddr_in));

        dislistenaddr.sin_family = AF_INET;
        dislistenaddr.sin_addr.s_addr = INADDR_ANY;
        dislistenaddr.sin_port = htons(AGG_SERVER_PORT);


        int childretbind = bind(dislistenfd,
                (struct sockaddr*)&dislistenaddr,
                sizeof(dislistenaddr));

        if (childretbind < 0) {
            fprintf(stderr, "Failed to bind\n");
            return -1;
        }

        int childretlisten = listen(dislistenfd, 5);

        if (childretlisten < 0) {
            fprintf(stderr, "Failed to listen to scoket\n");
            return -1;
        }

        int reuse = 1;
        setsockopt(dislistenfd,
                SOL_SOCKET,
                SO_REUSEADDR,
                &reuse,
                sizeof(reuse));

        setnonblock(dislistenfd);
        setnonblock(child_read);

        //管道读端,添加事件,一直获取信息
        struct bufferevent * child_pipe_read_bev = 
            bufferevent_new(child_read, child_pip_read_callback,
                    NULL, NULL, NULL);

        bufferevent_enable(child_pipe_read_bev, EV_READ);
        //set event
        event_set(&dis_server_accept_event,
                dislistenfd,
                EV_READ | EV_PERSIST,
                dis_server_accept_callback,
                NULL);

        //add event
        event_add(&dis_server_accept_event, NULL);

        //loop waiting for event and trigger call_back
        event_dispatch();

        fprintf(stderr, "loop finished\n");

        close(dislistenfd);



    } 
    //父进程用于数据收集
    if(pid > 0){

        //父经常采用libevent进行数据收集,中转所有发送器发送的请求信息
        fprintf(stderr, "data aggregator is ok !parent proc %d\n", pid);
        close(pipe_fd[0]);

        int listenfd;

        struct sockaddr_in listenaddr;

        //init a event
        struct event agg_server_accept_event;

        int reuse = 1;

        event_init();

        listenfd = socket(AF_INET, SOCK_STREAM, 0);

        if (listenfd < 0) {
            fprintf(stderr, "Failed to create listen socket\n");
            return -1;
        }

        memset(&listenaddr, 0, sizeof(listenaddr));

        listenaddr.sin_family = AF_INET;
        listenaddr.sin_addr.s_addr = INADDR_ANY;
        listenaddr.sin_port = htons(SERVER_PORT);

        int retbind = bind(listenfd,
                (struct sockaddr*)&listenaddr,
                sizeof(listenaddr));

        if (retbind < 0) {
            fprintf(stderr, "Failed to bind\n");
            char * errormsg = strerror(errno);
            fprintf(stderr, "%s\n", errormsg);
            return -1;
        }

        int retlisten = listen(listenfd, 5);

        if (retlisten < 0) {
            fprintf(stderr, "Failed to listen to scoket\n");
            return -1;
        }

        puts("waiting client");


        setsockopt(listenfd,
                SOL_SOCKET,
                SO_REUSEADDR,
                &reuse,
                sizeof(reuse));

        setnonblock(listenfd);

        //set event
        event_set(&agg_server_accept_event,
                listenfd,
                EV_READ | EV_PERSIST,
                agg_server_accept_callback,
                NULL);

        //add event
        event_add(&agg_server_accept_event, NULL);

        //loop waiting for event and trigger call_back
        event_dispatch();

        fprintf(stderr ,"loop stop\n");

        close(listenfd);
    }

    return 0;
}
